Skip to content

Conversation

@LCarmi
Copy link
Contributor

@LCarmi LCarmi commented Nov 28, 2025

Description

Changes the self.buffers data structure in MultiSyncDataCollectors to a list in order to ensure that the returned batch respects the worker_id order.

Motivation and Context

The main motivation behind this change is to enable an id-based retrieval of experience when sampling from multiple, possibly different environments/policies. More in detail, if the user specifies a list of environments/policies, this change guarantees that batch[i] corresponds to the experience sampled from the ith environment and policy.

This was not previously available because the self.buffer was previously a dict, and different processes return experience possibly out-of-order due to having unpredictable latency. Since a dict is iterated in insertion order, the relative order between worker ids was not maintaned.

Note: if preemption is enabled, then the results will just maintain relative order, since we are injecting "bubbles" in our returned batch.

Types of changes

What types of changes does your code introduce? Remove all that do not apply:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds core functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation (update in the documentation)
  • Example (update in the folder of examples)

Checklist

Go over all the following points, and put an x in all the boxes that apply.
If you are unsure about any of these, don't hesitate to ask. We are here to help!

  • I have read the CONTRIBUTION guide (required)
  • [~] My change requires a change to the documentation.
  • I have updated the tests accordingly (required for a bug fix or a new feature).
  • [~] I have updated the documentation accordingly.

@pytorch-bot
Copy link

pytorch-bot bot commented Nov 28, 2025

🔗 Helpful Links

🧪 See artifacts and rendered test results at hud.pytorch.org/pr/pytorch/rl/3243

Note: Links to docs will display an error until the docs builds have been completed.

❌ 10 New Failures, 1 Cancelled Job, 10 Unrelated Failures

As of commit c428fd4 with merge base 8570c25 (image):

NEW FAILURES - The following jobs have failed:

CANCELLED JOB - The following job was cancelled. Please retry:

BROKEN TRUNK - The following jobs failed but were present on the merge base:

👉 Rebase onto the `viable/strict` branch to avoid these failures

This comment was automatically generated by Dr. CI and updates every 15 minutes.

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Nov 28, 2025
Copy link
Contributor Author

@LCarmi LCarmi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some open comments on some open points I still have.

Moreover, should this guarantee be specified anywhere in the documentation of MultiSyncDataCollector.__init__ or _MultiDataCollector.__init__?

buffers = {}
for worker_idx, buffer in self.buffers.items():
buffers = [None] * self.num_workers
for idx, buffer in enumerate(filter(None.__ne__, self.buffers)):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to filter out buffers who did not return their experience: I use the enumerate(filter(None.__ne__, self.buffers)) idiom to make this compact and hopefully readable; I'm open to better ideas

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you define idx which was defined earlier (LoC 3829 or 3840). It should be worker_idx I believe
See my comment below

Comment on lines -3866 to -3870
# Skip frame counting if this worker didn't send data this iteration
# (happens when reusing buffers or on first iteration with some workers)
if idx not in buffers:
continue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am puzzled by this code, and I miss where it could happen that idx is defined but the related buffer be None

An equivalent code here would be if buffers[idx] is None: continue

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens during preemption: if we say that we're ok with 80% of the data it could be that we don't have data for one of the workers and we just return whatever we have at this stage.

Copy link
Collaborator

@vmoens vmoens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these changes make sense but there seems to be a bug in idx vs worker_idx naming in the preemption case
Other than that lgtm!

buffers = {}
for worker_idx, buffer in self.buffers.items():
buffers = [None] * self.num_workers
for idx, buffer in enumerate(filter(None.__ne__, self.buffers)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok

buffers = {}
for worker_idx, buffer in self.buffers.items():
buffers = [None] * self.num_workers
for idx, buffer in enumerate(filter(None.__ne__, self.buffers)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you define idx which was defined earlier (LoC 3829 or 3840). It should be worker_idx I believe
See my comment below

Comment on lines -3866 to -3870
# Skip frame counting if this worker didn't send data this iteration
# (happens when reusing buffers or on first iteration with some workers)
if idx not in buffers:
continue

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens during preemption: if we say that we're ok with 80% of the data it could be that we don't have data for one of the workers and we just return whatever we have at this stage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants